面向高并发的消息中间件内核优化

Kernel Optimizations for High‑Concurrency Messaging

ThreadLocal → RequestId 映射 · SPSC Ring Buffer 无锁队列 · Off‑heap DirectBuffer

ThreadLocal → RequestId Mapping · SPSC Lock‑free Ring · Off‑heap DirectBuffer

Java 17+ Netty Lock‑free Pooled Direct Memory
  • 并发请求:RequestId → CompletableFuture 映射,支持海量并发
  • Concurrent Requests: RequestId → CompletableFuture mapping for high parallelism
  • 极致吞吐:SPSC Ring Buffer 替代 CLQ,混合等待策略
  • Throughput: SPSC Ring Buffer replaces CLQ with hybrid wait strategy
  • 低 GC 压力:DirectByteBuffer 池化,零拷贝发送
  • Low GC: Pooled DirectByteBuffer with zero‑copy send

00 产品概览 Product Overview

AeroMQ 是一个面向高并发、低延迟场景的高性能消息队列系统,采用 Netty 异步 I/O、无锁 SPSC 环形队列和 off‑heap 内存池化,提供稳定且可观测的消息通道。

AeroMQ is a high‑performance message queue built for high concurrency and low latency, leveraging Netty async I/O, lock‑free SPSC ring buffers, and off‑heap pooled memory for stable and observable messaging.

100k+
小消息吞吐 (msg/s)
Small‑msg throughput
P99 < 10ms
端到端延迟
End‑to‑end latency
1000+
并发连接
Concurrent connections

01 核心特性 Key Features

并发请求映射Concurrent Request Mapping

RequestId → CompletableFuture

SPSC Ring

Lock‑free, hybrid wait

Off‑heap

Pooled DirectBuffer

Netty

Async I/O

Benchmark

CSV + Charts

Observability

Stats & Logs

02 架构与目录 Architecture & Layout

AeroMQ/
├─ aeromq-protocol     # 协议与命令 / Protocol & Commands
├─ aeromq-core         # Broker 核心 / Broker Core
├─ aeromq-client       # 客户端 SDK / Client SDK
├─ aeromq-benchmark    # 基准测试 / Benchmark Suite
└─ docs                # 静态文档 / Static Docs
Clients Protocol / Netty SPSC Queues Off‑heap Pool

03 模块总览 Modules Overview

aeromq-protocol

定义协议与命令集

Defines protocol & commands

aeromq-core

Broker 核心,SPSC + Off‑heap

Broker core, SPSC + Off‑heap

aeromq-client

客户端 SDK,请求映射

Client SDK, request mapping

aeromq-benchmark

性能测试与可视化

Benchmarks & charts

docs

静态文档与示例

Static docs & samples

03A 典型应用场景 Typical Use Cases

实时监控/告警

Real‑time Monitoring/Alerting

Low latency, high fan‑out

交易与结算事件

Transactional Events

At‑least‑once semantics

IoT / 流数据

IoT / Streaming

Backpressure aware

微服务解耦

Microservice Decoupling

Async pipelines

03B 协议要点 Protocol Details

// Frame (pseudo)
struct Header {
  int32 length;
  int16 command; // e.g. SEND
  int64 requestId;
}
// Payload: message body

04 快速开始 Quick Start

Windows

install-maven.bat   // 可选:自动安装 Maven
quick-start-fixed.bat  // 修复中文编码,推荐

Linux / macOS

chmod +x quick-start.sh
./quick-start.sh

若出现中文乱码,先执行 chcp 65001 再运行脚本。

If mojibake appears, run chcp 65001 before scripts.

01 ThreadLocal → RequestId → CompletableFuture(支持高并发多请求) ThreadLocal → RequestId → CompletableFuture (High‑Concurrency)

目标与动机

Goals & Motivation

摆脱 ThreadLocal 单线程限制,使用连接级 requestId 映射 精确回送响应。

Remove ThreadLocal single‑thread limits by using per‑connection requestId mapping for precise response routing.

关键部件与数据结构

Key Components & Data Structures

  • AtomicLong requestIdGen
  • ConcurrentHashMap<Long, CompletableFuture<Response>> pendingRequests
  • 超时调度器:ScheduledExecutorDelayQueue
  • Timeout handling via ScheduledExecutor or DelayQueue
  • 消息协议携带 requestId(帧 header / protobuf 字段)
  • Protocol carries requestId (frame header / protobuf field)
客户端发送流程
Client Send Flow
long reqId = requestIdGen.incrementAndGet();
CompletableFuture<Response> f = new CompletableFuture<>();
pendingRequests.put(reqId, f);
// ... serialize (with requestId) & write to channel
scheduler.schedule(() -> {
  var g = pendingRequests.remove(reqId);
  if (g != null) g.completeExceptionally(new TimeoutException());
}, timeout, MILLISECONDS);
return f;
客户端接收响应
Client Receive Flow
// decode to requestId + payload
var f = pendingRequests.remove(requestId);
if (f != null) f.complete(response);
else log.warn("late/unknown response");
Broker:回包沿用请求中的 requestId;无请求ID的推送消息走订阅流。 Broker: echoes requestId in responses; push/notify without requestId uses subscription stream.

边界条件与注意

Edge Cases & Notes

  • 限制 pending 数量,超阈值拒绝或 429
  • Limit pending count; reject or return 429 on overflow
  • 64‑bit requestId 回绕一般可忽略;循环需确保旧请求已清理
  • 64‑bit wrap‑around is negligible; ensure old entries are purged
  • 断连时遍历 pending 并完成异常,避免资源泄露
  • On disconnect, fail all pending to release resources
10k+
单连接并发
Concurrent per connection
P99 < 10ms
端到端延迟
End‑to‑end latency
50%+
GC 压力下降
GC pressure reduced

02 SPSC Ring Buffer(无锁)与条件变量唤醒 SPSC Ring Buffer (Lock‑free) with Condition‑based Wakeup

选择 SPSC 的场景

When to Choose SPSC

按 key → shard 路由;每 shard 单消费者;多生产者通过连接缓冲聚合到单 SPSC。

Key→shard routing; single consumer per shard; multi‑producer via per‑connection buffer aggregated to one SPSC.

实现要点

Implementation Notes

  • 容量 2 的幂,index = seq & mask
  • head/tail 分离更新,避免写竞争
  • false sharing 避免(padding / @Contended
  • 内存序:写发布(release),读获取(acquire)
// 判满/判空
available = tail - head
if (available == capacity) full
if (available == 0) empty

// 生产后通知(从空→非空才通知)
if (wasEmpty) condition.signal();

// 消费者等待(混合策略)
spin a few cycles → condition.await(timeout)

03 Off‑heap DirectByteBuffer:池化减少 GC Off‑heap DirectByteBuffer: Pooled for Lower GC

设计思路(内存池 + slab)

Design (Pool + Slab)

  • 预分配 direct memory,按 256B/1KB/4KB/16KB 分级
  • 返回 BufferRef(id/offset/len),ring 保存引用而非 payload
  • 处理后归还池;异常路径务必释放
  • 推荐 Netty PooledByteBufAllocator
// 零拷贝发送(示意)
ByteBuf buf = pooledAllocator.directBuffer(len);
buf.writeBytes(offHeapRef);
channel.writeAndFlush(buf); // avoid heap copy
监控 direct memory 使用,必要时启用 NativeMemoryTracking;注意碎片与泄漏。 Monitor direct memory (enable NMT if needed); watch for fragmentation and leaks.

统计信息

Memory Stats

// Off-heap MemoryStats (example output)
active=128, allocated=2048, released=1920,
memory=64,000,000 bytes, pools=[small=240, medium=120, large=60]

边界条件与注意

Edge Cases & Notes

  • 内存碎片监控,必要时调整 slab 配置
  • Monitor for fragmentation; adjust slab config if needed
  • DirectByteBuffer 直接分配的内存不受管控
  • DirectByteBuffer allocated memory is unmanaged

05A 可观测性与指标 Observability & Metrics

Request

inflight.count, timeout.count

Queues

ring.depth, drain.rate

Memory

direct.used, pool.{small,medium,large}

Network

bytes.in/out, conn.count

05 基准测试与可视化 Benchmarks & Visualization

使用 BenchmarkRunner 产出 CSV,配合 scripts/visualize_benchmark.py 生成图表与 HTML 报告。

Run BenchmarkRunner to produce CSV; visualize with scripts/visualize_benchmark.py.

100k+
msg/s (100B)
50k+
msg/s (1KB)
10k+
msg/s (10KB)

06A 安全与可靠性 Security & Reliability

06 故障排除 Troubleshooting

06B 常见问题 FAQ

为何不用 MPSC 替代 SPSC? Why not MPSC instead of SPSC?
SPSC 更简单且性能更优;多生产者可经聚合线程汇入单 SPSC。
SPSC is simpler and faster; multi‑producers can be aggregated into one SPSC.
requestId 会不会溢出? Will requestId overflow?
使用 64 位整型,实际应用中可视为不会溢出。
Using 64‑bit integers; practically non‑overflowing.

07 路线图 Roadmap

04 参考代码片段(伪代码) Reference Snippets (Pseudo‑code)

// RequestId → CompletableFuture
long reqId = requestIdGen.incrementAndGet();
CompletableFuture<Response> f = new CompletableFuture<>();
pendingRequests.put(reqId, f);
// ... write to channel (with requestId)
scheduler.schedule(() -> {
  var g = pendingRequests.remove(reqId);
  if (g != null) g.completeExceptionally(new TimeoutException());
}, timeout, MILLISECONDS);
// SPSC Ring Buffer (核心思路)
final int capacity = 1 << 16; // power-of-two
final int mask = capacity - 1;
volatile long head = 0, tail = 0;
Object[] slots = new Object[capacity];

boolean offer(Object x) {
  long t = tail; long h = head;
  if (t - h == capacity) return false; // full
  slots[(int)(t & mask)] = x; // release
  tail = t + 1;               // ordered
  return true;
}

Object poll() {
  long h = head; long t = tail;
  if (t - h == 0) return null; // empty
  Object x = slots[(int)(h & mask)]; // acquire
  slots[(int)(h & mask)] = null;
  head = h + 1;
  return x;
}